Skip to content

Commit d9108f2

Browse files
committed
Implement affiliate payout queue
1 parent 9e88a91 commit d9108f2

File tree

8 files changed

+1285
-4
lines changed

8 files changed

+1285
-4
lines changed

apps/labrinth/.env.docker-compose

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,3 +146,5 @@ GOTENBERG_URL=http://labrinth-gotenberg:13000
146146
GOTENBERG_CALLBACK_BASE=http://host.docker.internal:8000/_internal/gotenberg
147147

148148
ARCHON_URL=none
149+
150+
DEFAULT_AFFILIATE_REVENUE_SPLIT=0.1

apps/labrinth/.env.local

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,3 +147,5 @@ GOTENBERG_URL=http://localhost:13000
147147
GOTENBERG_CALLBACK_BASE=http://host.docker.internal:8000/_internal/gotenberg
148148

149149
ARCHON_URL=none
150+
151+
DEFAULT_AFFILIATE_REVENUE_SPLIT=0.1

apps/labrinth/migrations/20251024182919_subscription_affiliations.sql

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
CREATE TABLE users_subscriptions_affiliations (
2-
id BIGSERIAL NOT NULL PRIMARY KEY,
2+
id BIGSERIAL NOT NULL PRIMARY KEY,
33
subscription_id BIGINT NOT NULL REFERENCES users_subscriptions(id),
44
affiliate_code BIGINT NOT NULL REFERENCES affiliate_codes(id),
5+
created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP,
56
deactivated_at TIMESTAMPTZ
67
);
78

@@ -13,3 +14,6 @@ CREATE TABLE users_subscriptions_affiliations_payouts(
1314
payout_value_id BIGSERIAL NOT NULL REFERENCES payouts_values(id),
1415
UNIQUE (charge_id)
1516
);
17+
18+
ALTER TABLE payouts_values
19+
ADD COLUMN affiliate_code_source BIGINT;

apps/labrinth/src/background_task.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@ use crate::queue::billing::{index_billing, index_subscriptions};
33
use crate::queue::email::EmailQueue;
44
use crate::queue::payouts::{
55
PayoutsQueue, index_payouts_notifications,
6-
insert_bank_balances_and_webhook, process_payout,
6+
insert_bank_balances_and_webhook, process_affiliate_payouts,
7+
process_payout,
78
};
89
use crate::search::indexing::index_projects;
910
use crate::util::anrok;
@@ -179,12 +180,17 @@ pub async fn payouts(
179180
info!("Started running payouts");
180181
let result = process_payout(&pool, &clickhouse).await;
181182
if let Err(e) = result {
182-
warn!("Payouts run failed: {:?}", e);
183+
warn!("Payouts run failed: {e:#?}");
183184
}
184185

185186
let result = index_payouts_notifications(&pool, &redis_pool).await;
186187
if let Err(e) = result {
187-
warn!("Payouts notifications indexing failed: {:?}", e);
188+
warn!("Payouts notifications indexing failed: {e:#?}");
189+
}
190+
191+
let result = process_affiliate_payouts(&pool).await;
192+
if let Err(e) = result {
193+
warn!("Affiliate payouts run failed: {e:#?}");
188194
}
189195

190196
info!("Done running payouts");

apps/labrinth/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -526,5 +526,7 @@ pub fn check_env_vars() -> bool {
526526

527527
failed |= check_var::<String>("ARCHON_URL");
528528

529+
failed |= check_var::<String>("DEFAULT_AFFILIATE_REVENUE_SPLIT");
530+
529531
failed
530532
}

apps/labrinth/src/queue/payouts.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ use std::collections::HashMap;
2626
use tokio::sync::RwLock;
2727
use tracing::{error, info};
2828

29+
mod affiliate;
30+
pub use affiliate::process_affiliate_payouts;
31+
2932
pub struct PayoutsQueue {
3033
credential: RwLock<Option<PayPalCredentials>>,
3134
payout_options: RwLock<Option<PayoutMethods>>,
Lines changed: 197 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
use std::collections::HashMap;
2+
3+
use chrono::{DateTime, Datelike, Duration, TimeZone, Utc};
4+
use eyre::{Context, Result, eyre};
5+
use rust_decimal::Decimal;
6+
use sqlx::PgPool;
7+
use tracing::warn;
8+
9+
use crate::database::models::{DBAffiliateCodeId, DBUserId};
10+
11+
pub async fn process_affiliate_payouts(postgres: &PgPool) -> Result<()> {
12+
/// Data for an (affiliate user, affiliate code) pair.
13+
#[derive(Debug, Default)]
14+
struct AffiliatePayoutInfo {
15+
/// How much the affiliate will earn from this code.
16+
amount: Decimal,
17+
/// Which (charge, subscription) pairs will be linked to this payout.
18+
charge_subscription_ids: Vec<(i64, i64)>,
19+
}
20+
21+
// process:
22+
// - get any subscriptions which are in `users_subscriptions_affiliations`
23+
// - for those subscriptions, get any charges which are not in `users_subscriptions_affiliations_payouts`
24+
// - for each of those charges,
25+
// - get the subscription's `affiliate_code`
26+
// - get the affiliate user of that code
27+
// - add a payout for that affiliate user, proportional to the net of the charge
28+
// - add a record of this into `users_subscriptions_affiliations_payouts`
29+
30+
let mut txn = postgres
31+
.begin()
32+
.await
33+
.wrap_err("failed to begin transaction")?;
34+
35+
let rows = sqlx::query!(
36+
r#"
37+
SELECT
38+
c.id as charge_id,
39+
c.subscription_id AS "subscription_id!",
40+
c.net as charge_net,
41+
c.currency_code,
42+
usa.affiliate_code,
43+
ac.affiliate as affiliate_user_id,
44+
ac.revenue_split
45+
-- get any charges...
46+
FROM charges c
47+
-- ...which have a subscription...
48+
INNER JOIN users_subscriptions_affiliations usa
49+
ON c.subscription_id = usa.subscription_id
50+
AND c.subscription_id IS NOT NULL
51+
-- ...which have an affiliate code...
52+
INNER JOIN affiliate_codes ac
53+
ON usa.affiliate_code = ac.id
54+
AND usa.deactivated_at IS NULL
55+
-- ...and where no payout to an affiliate has been made for this charge yet
56+
LEFT JOIN users_subscriptions_affiliations_payouts usap
57+
ON c.id = usap.charge_id
58+
WHERE
59+
c.status = 'succeeded'
60+
AND c.net > 0
61+
AND usap.id IS NULL
62+
"#
63+
)
64+
.fetch_all(&mut *txn)
65+
.await
66+
.wrap_err("failed to fetch charges awaiting affiliate payout")?;
67+
68+
let default_affiliate_revenue_split =
69+
dotenvy::var("DEFAULT_AFFILIATE_REVENUE_SPLIT")
70+
.wrap_err("no env var `DEFAULT_AFFILIATE_REVENUE_SPLIT`")?
71+
.parse::<Decimal>()
72+
.wrap_err("`DEFAULT_AFFILIATE_REVENUE_SPLIT` is not a decimal")?;
73+
74+
let now = Utc::now();
75+
let start: DateTime<Utc> = DateTime::from_naive_utc_and_offset(
76+
(now - Duration::days(1))
77+
.date_naive()
78+
.and_hms_nano_opt(0, 0, 0, 0)
79+
.unwrap_or_default(),
80+
Utc,
81+
);
82+
83+
// affiliate payouts are Net 60 from the end of the month
84+
let available = {
85+
let now = Utc::now().date_naive();
86+
87+
let year = now.year();
88+
let month = now.month();
89+
90+
// get the first day of the next month
91+
let last_day_of_month = if month == 12 {
92+
Utc.with_ymd_and_hms(year + 1, 1, 1, 0, 0, 0).unwrap()
93+
} else {
94+
Utc.with_ymd_and_hms(year, month + 1, 1, 0, 0, 0).unwrap()
95+
};
96+
97+
last_day_of_month + Duration::days(59)
98+
};
99+
100+
// collect the rev from each affiliate and their code, and sum up values
101+
let mut payouts =
102+
HashMap::<(DBUserId, DBAffiliateCodeId), AffiliatePayoutInfo>::new();
103+
104+
for row in rows {
105+
let Some(net) = row.charge_net else {
106+
warn!(
107+
"Charge {} has no net amount; cannot calculate affiliate payout",
108+
row.charge_id
109+
);
110+
continue;
111+
};
112+
let net = Decimal::new(net, 2);
113+
114+
let revenue_split = row
115+
.revenue_split
116+
.and_then(Decimal::from_f64_retain)
117+
.unwrap_or(default_affiliate_revenue_split);
118+
if !(Decimal::from(0)..=Decimal::from(1)).contains(&revenue_split) {
119+
warn!(
120+
"Charge {} has revenue split {} which is out of range",
121+
row.charge_id, revenue_split
122+
);
123+
continue;
124+
}
125+
126+
let affiliate_cut = net * revenue_split;
127+
let affiliate_user_id = DBUserId(row.affiliate_user_id);
128+
let affiliate_code_id = DBAffiliateCodeId(row.affiliate_code);
129+
130+
let payout_info = payouts
131+
.entry((affiliate_user_id, affiliate_code_id))
132+
.or_default();
133+
// a portion of this charge will be added as a payout to the affiliate...
134+
payout_info.amount += affiliate_cut;
135+
payout_info
136+
.charge_subscription_ids
137+
.push((row.charge_id, row.subscription_id));
138+
}
139+
140+
for ((affiliate_id, affiliate_code_id), payout_info) in payouts {
141+
let payout_value_id = sqlx::query!(
142+
"
143+
INSERT INTO payouts_values
144+
(user_id, amount, created,
145+
date_available, affiliate_code_source)
146+
VALUES ($1, $2, $3, $4, $5)
147+
RETURNING id
148+
",
149+
affiliate_id.0,
150+
payout_info.amount,
151+
start,
152+
available,
153+
affiliate_code_id.0,
154+
)
155+
.fetch_one(&mut *txn)
156+
.await
157+
.wrap_err_with(|| eyre!("failed to insert payout value for ({affiliate_id:?}, {affiliate_code_id:?})"))?
158+
.id;
159+
160+
let (
161+
mut insert_usap_charges,
162+
mut insert_usap_subscriptions,
163+
mut insert_usap_affiliate_codes,
164+
mut insert_usap_payout_values,
165+
) = (Vec::new(), Vec::new(), Vec::new(), Vec::new());
166+
167+
for (charge_id, subscription_id) in payout_info.charge_subscription_ids
168+
{
169+
insert_usap_charges.push(charge_id);
170+
insert_usap_subscriptions.push(subscription_id);
171+
insert_usap_affiliate_codes.push(affiliate_code_id.0);
172+
insert_usap_payout_values.push(payout_value_id);
173+
}
174+
175+
sqlx::query!(
176+
"
177+
INSERT INTO users_subscriptions_affiliations_payouts
178+
(charge_id, subscription_id,
179+
affiliate_code, payout_value_id)
180+
SELECT * FROM UNNEST($1::bigint[], $2::bigint[], $3::bigint[], $4::bigint[])
181+
",
182+
&insert_usap_charges[..],
183+
&insert_usap_subscriptions[..],
184+
&insert_usap_affiliate_codes[..],
185+
&insert_usap_payout_values[..],
186+
)
187+
.execute(&mut *txn)
188+
.await
189+
.wrap_err("failed to associate charges with affiliate payouts")?;
190+
}
191+
192+
txn.commit()
193+
.await
194+
.wrap_err("failed to commit transaction")?;
195+
196+
Ok(())
197+
}

0 commit comments

Comments
 (0)